-
Notifications
You must be signed in to change notification settings - Fork 4.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
transport: consume per-stream inflow windows ahead of time - large speedup for big messages #1073
Conversation
cc @menghanl, this is the |
cc @sailat |
transport/transport.go
Outdated
@@ -186,14 +186,15 @@ type Stream struct { | |||
recvCompress string | |||
sendCompress string | |||
buf *recvBuffer | |||
dec io.Reader | |||
fc *inFlow | |||
recvQuota uint32 | |||
// The accumulated inbound quota pending for window update. | |||
updateQuota uint32 | |||
// The handler to control the window update procedure for both this | |||
// particular stream and the associated transport. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments need to be updated to explain the new behavior.
transport/transport.go
Outdated
} | ||
defer func() { s.readFullErr = err }() | ||
s.streamWindowHandler(int64(len(p))) | ||
return io.ReadFull(s.sr, p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
n, err = io.ReadFull(s.sr, p)
return
instead of
return io.ReadFull(s.sr, p)
So that the return values n and err get updated and the defer function has a valid value of err to populate s.readFullErr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, FYI, return values are assigned via a return statement, so return io.ReadFull(s.sr, p)
is fine. Example: https://play.golang.org/p/V1qMks7o9s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah neat!
transport/control.go
Outdated
@@ -149,45 +149,45 @@ type inFlow struct { | |||
mu sync.Mutex | |||
// pendingData is the overall data which have been received but not been | |||
// consumed by applications. | |||
pendingData uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The http2 framer.WriteWindowUpdate(streamID, incr uint32) doesn't support int64. It's better to let the sizes be in uint32.
I think this points to a bug: |
ee57995
to
d54684c
Compare
This is UNSAFE as is - there is a major bug here: if we do a pre-emptive window read that puts the |
9d2a30d
to
605ea24
Compare
latest updates tentatively fix the race in which pending data in a closed stream's recvBuffer isn't given back to the transport. it also moves the API break where |
I tried this PR as a solution to the slow performance, but it just truncated my 512 message stream after 3 messages. It seems half-baked/buggy. |
@glycerine there was a major bug here due to a simple misuse of Though this could still use some more cleanup, I'd like to get this in as a fix to the high-latency/large-message issues, since it doesn't need the user knobs. This should really help for scenarios involving:
Of course more than welcome to retest with the latest commits here - I'd expect about 15x speedup for the situation above (with window bottleneck moving from 64K to 1M) |
311884b
to
eef27ff
Compare
go1.6.3 only failed on travis on |
actually re-running travis has |
ca0352d
to
20d2c2c
Compare
20d2c2c
to
78b0b88
Compare
cc @dfawley btw this is the PR I mentioned offline. Just squashed and rebased to resolve conflicts. I think this is ok for a look. |
transport/control.go
Outdated
if f.pendingData == 0 { | ||
return 0 | ||
if n > http2MaxWindowUpdate { | ||
grpclog.Fatalf("potential window update too large. onRead(n) where n is %v; max n is %v", f.pendingUpdate, http2MaxWindowUpdate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a fatalf? When can such a condition occur?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this
transport/control.go
Outdated
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
if f.loanedWindowSpace > 0 { | ||
grpclog.Fatalf("pre-consuming window space while there is pre-consumed window space still outstanding") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, Fatalf might be too harsh.
transport/transport.go
Outdated
} | ||
defer func() { r.readFullErr = err }() | ||
committedReadAmount := min(maxSingleStreamWindowUpdate, uint32(len(p))) | ||
r.loanSpaceInStreamWindow(committedReadAmount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we will loan out window space every single time. There may be times when we don't even need the loan.
Does it make sense to have a few if conditions like these:
if len(p) - fc.pendingData < fc.limit {
// no need to loan
} else {
delta = (len(p) - fc.pendingData) - fc.limit
loan(delta)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still haven't addressed this yet but might revisit
transport/http_util.go
Outdated
@@ -495,6 +495,9 @@ func (f *framer) writeSettingsAck(forceFlush bool) error { | |||
} | |||
|
|||
func (f *framer) writeWindowUpdate(forceFlush bool, streamID, incr uint32) error { | |||
if incr > http2MaxWindowUpdate { | |||
grpclog.Fatalf("attempted window update too large. have %v; max is %v", incr, http2MaxWindowUpdate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again Fatalf seems too harsh.
transport/control.go
Outdated
@@ -161,34 +171,51 @@ type inFlow struct { | |||
limit uint32 | |||
|
|||
mu sync.Mutex | |||
// pendingData is the overall data which have been received but not been | |||
// PendingData is the overall data which have been received but not been |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You changed the capitalization here but the field is still unexported. Maybe just remove the "PendingData is" part?
transport/control.go
Outdated
// consumed by applications. | ||
pendingData uint32 | ||
// The amount of data the application has consumed but grpc has not sent | ||
// window update for them. Used to reduce window update frequency. | ||
pendingUpdate uint32 | ||
|
||
// This is temporary space in the incoming flow control that can be granted at convenient times | ||
// to prevent the sender from stalling for lack flow control space. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lack of flow control
transport/control.go
Outdated
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
if f.loanedWindowSpace > 0 { | ||
grpclog.Fatalf("pre-consuming window space while there is pre-consumed window space still outstanding") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: extra space between still & outstanding.
transport/control.go
Outdated
} | ||
f.loanedWindowSpace = n | ||
|
||
if f.loanedWindowSpace+f.pendingUpdate >= f.limit/4 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor improvement:
wu := f.pendingUpdate + f.loanedWindowSpace
if wu >= f.limit/4 {
f.pendingUpdate = 0
return wu
}
return 0
transport/control.go
Outdated
} | ||
f.loanedWindowSpace = n | ||
|
||
if f.loanedWindowSpace+f.pendingUpdate >= f.limit/4 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the significance of the 25% here? Maybe make that a const if it's arbitrary?
transport/transport.go
Outdated
if s.readFullErr != nil { | ||
return 0, s.readFullErr | ||
} | ||
defer func() { s.readFullErr = err }() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know if the compiler optimizes this?
If not, this would be better:
n, err = io.ReadFull()
s.readFullErr = err
return n, err
transport/transport.go
Outdated
if r.readFullErr != nil { | ||
return 0, r.readFullErr | ||
} | ||
defer func() { r.readFullErr = err }() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment regarding defer vs. straight-line.
// before server starts reading. | ||
time.Sleep(2 * time.Second) | ||
_, err := s.ReadFull(p) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps log the error
} | ||
p := make([]byte, len(req)) | ||
_, err := s.ReadFull(p) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thing
p := make([]byte, len(expectedResponseLarge)) | ||
|
||
// Give time to server to begin sending before client starts reading. | ||
time.Sleep(2 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed on both sides? The server will sleep for 2 seconds before reading and sending response back. Could we just call s.ReadFull and that'll wait for server to send all p bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think was needed on both sides only since I wanted to have a good chance that the reader will have started trying to read before any bytes have arrived, on both client and server.
But that said this is racey and I'm wondering if this would be better replaced by unit tests on an inFlow
instance ... doing some experimentation.
transport/transport_test.go
Outdated
@@ -1120,10 +1257,12 @@ func TestClientWithMisbehavedServer(t *testing.T) { | |||
if err := ct.Write(s, d, &Options{Last: true, Delay: false}); err != nil && err != io.EOF { | |||
t.Fatalf("Failed to write: %v", err) | |||
} | |||
// reflect to get the inner recvBufferReader, which reads without doing window updates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/reflect/type assert/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
transport/transport_test.go
Outdated
@@ -1182,7 +1321,9 @@ func TestEncodingRequiredStatus(t *testing.T) { | |||
t.Fatalf("Failed to write the request: %v", err) | |||
} | |||
p := make([]byte, http2MaxFrameLen) | |||
if _, err := s.dec.Read(p); err != io.EOF { | |||
// reflect to get the plain recvBufferReader from the stream's stream reader, which doesn't do window updates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thing here and below
b9639e3
to
edfb984
Compare
superseded by #1248 |
This is one solution to the problem with grpc vs. http1 that was noticed in #1043. (fixed 64K window causing too many round trips for large messages).
Consumes inflow stream windows based off of requested read size (so e.g., gives stream window update of 2M for 2M grpc message before reading it in, but doesn't change connection window).
This technique used in C-core? cc @ctiller.
Can have big speedup for large messages (e.g, ~10s for 2M request/response down to ~1s, with 150ms RTT).
AFAICS
Stream.Read
isn't safe to do this with sinceio.Read
could be called repeatedly - possible to do but to me a rename seems safer... So this changes transport package to exposeStream.ReadFull(_)
, which tries to be equivalent to currentio.ReadFull("stream", _)